Skip to content

[SPARK-40193][SQL] Merge subplans with different filter conditions#55298

Open
peter-toth wants to merge 1 commit intoapache:masterfrom
peter-toth:SPARK-40193-merge-filters-2
Open

[SPARK-40193][SQL] Merge subplans with different filter conditions#55298
peter-toth wants to merge 1 commit intoapache:masterfrom
peter-toth:SPARK-40193-merge-filters-2

Conversation

@peter-toth
Copy link
Copy Markdown
Contributor

@peter-toth peter-toth commented Apr 10, 2026

What changes were proposed in this pull request?

PlanMerger is extended to merge non-correlated non-grouping aggregate subplans that differ only in their WHERE filter conditions.

Filter merging follows the same recursive plan-matching logic as the rest of PlanMerger and handles three cases:

  • (np: Filter, cp: Filter) with different conditions: both conditions are aliased as boolean attributes in a Project, a merged Filter(OR(f0, f1)) is introduced, and the aliases are propagated up to the enclosing Aggregate where each side's expressions receive a FILTER (WHERE ...) clause.
  • (np: Filter, cp) or (np, cp: Filter): only one side has a filter; the condition is exposed as a Project attribute and propagated up so only that side's aggregate expressions receive a FILTER clause.
  • Equal filter conditions pass through unchanged.

When plans also differ in intermediate Project expressions above a Filter, those expressions are wrapped with If(filterAttr, expr, null) to avoid computing them for rows that do not match that side's filter condition.

Example

// Input plans
Aggregate [sum(a) AS sum_a]         Aggregate [max(d) AS max_d]
+- Filter (a < 1)                   +- Project [udf(a) AS d]
   +- Scan t                           +- Filter (a > 1)
                                          +- Scan t

// Merged plan
Aggregate [sum(a) FILTER (WHERE f0) AS sum_a, max(d0) FILTER (WHERE f1) AS max_d]
+- Project [a, If(f1, udf(a), null) AS d0, f0, f1]
   +- Filter (f0 OR f1)
      +- Project [a, (a < 1) AS f0, (a > 1) AS f1]
         +- Scan t

Benefit: a single scan of t computes both aggregates, which is typically cheaper than two separate scans.

Drawback (symmetric case only): the merged Filter(f0 OR f1) is less selective than each individual filter, which may reduce IO pruning such as partition or file skipping. On heavily partitioned or file-pruned tables the extra IO can outweigh the scan-deduplication benefit. The asymmetric case ((np: Filter, cp)) is always beneficial because the unfiltered side would have read all the data anyway.

Configs:

  • spark.sql.planMerge.filterPropagation.enabled (internal, default true): master switch; disabling it turns off all filter-based merging.
  • spark.sql.planMerge.symmetricFilterPropagation.enabled (default true): controls the symmetric (Filter, Filter) case specifically, so users on IO-pruning-sensitive workloads can disable only that path while keeping the always-beneficial asymmetric merging.

MergeResult.outputMap is changed from AttributeMap[Attribute] to AttributeMap[Int], mapping each input plan attribute to its positional index in the merged output. Positional indices remain stable across subsequent PlanMerger.merge calls (outputs are only ever appended), whereas retained Attribute values can become stale when filter merging replaces expressions with new aliases. This also simplifies the two call sites in MergeSubplans.

Why are the changes needed?

Computing aggregates over the same table with different WHERE clauses is a common analytical pattern (e.g. conditional sums or counts for different predicates). Without this change each subquery forces a separate full scan; merging them reduces scan count and overall query cost.

Does this PR introduce any user-facing change?

Yes. A new config spark.sql.planMerge.symmetricFilterPropagation.enabled (default true) is added. The optimization is otherwise transparent: queries produce the same results, and both configs can be set to false to restore the previous behavior.

How was this patch tested?

New unit tests in MergeSubplansSuite and new end-to-end tests in PlanMergeSuite covering the basic two-subplan cases, three-subplan merging, disabled configs, grouping aggregates (not merged), asymmetric filters, stacked filters, and reversed filter ordering.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Sonnet 4.6

@peter-toth peter-toth force-pushed the SPARK-40193-merge-filters-2 branch 3 times, most recently from 219b19f to 66f7683 Compare April 12, 2026 18:38
### What changes were proposed in this pull request?

`PlanMerger` is extended to merge non-correlated non-grouping aggregate subplans
that differ only in their `WHERE` filter conditions.

Filter merging follows the same recursive plan-matching logic as the rest of
`PlanMerger` and handles three cases:

- `(np: Filter, cp: Filter)` with different conditions: both conditions are
  aliased as boolean attributes in a `Project`, a merged `Filter(OR(f0, f1))`
  is introduced, and the aliases are propagated up to the enclosing `Aggregate`
  where each side's expressions receive a `FILTER (WHERE ...)` clause.
- `(np: Filter, cp)` or `(np, cp: Filter)`: only one side has a filter; the
  condition is exposed as a `Project` attribute and propagated up so only that
  side's aggregate expressions receive a `FILTER` clause.
- Equal filter conditions pass through unchanged.

When plans also differ in intermediate `Project` expressions above a `Filter`,
those expressions are wrapped with `If(filterAttr, expr, null)` to avoid
computing them for rows that do not match that side's filter condition. Plain
attribute references are never wrapped since reading a column value is free.

**Example**

```
// Input plans
Aggregate [sum(a) AS sum_a]         Aggregate [max(d) AS max_d]
+- Filter (a < 1)                   +- Project [udf(a) AS d]
   +- Scan t                           +- Filter (a > 1)
                                          +- Scan t

// Merged plan
Aggregate [sum(a) FILTER f0 AS sum_a, max(d0) FILTER f1 AS max_d]
+- Project [a, If(f1, udf(a), null) AS d0, f0, f1]
   +- Filter (f0 OR f1)
      +- Project [a, (a < 1) AS f0, (a > 1) AS f1]
         +- Scan t
```

**Benefit**: a single scan of `t` computes both aggregates, which is typically
cheaper than two separate scans. The `If` wrapping ensures `udf(a)` is only
evaluated for rows that match `a > 1`.

**Drawback** (symmetric case only): the merged `Filter(f0 OR f1)` is less
selective than each individual filter, which may reduce IO pruning such as
partition or file skipping. On heavily partitioned or file-pruned tables the
extra IO can outweigh the scan-deduplication benefit. The asymmetric case
(`(np: Filter, cp)`) is always beneficial because the unfiltered side would
have read all the data anyway.

**Configs**:

- `spark.sql.planMerge.filterPropagation.enabled` (internal, default `true`):
  master switch; disabling it turns off all filter-based merging.
- `spark.sql.planMerge.symmetricFilterPropagation.enabled` (default `true`):
  controls the symmetric `(Filter, Filter)` case specifically, so users on
  IO-pruning-sensitive workloads can disable only that path while keeping the
  always-beneficial asymmetric merging.

`MergeResult.outputMap` is changed from `AttributeMap[Attribute]` to
`AttributeMap[Int]`, mapping each input plan attribute to its positional index
in the merged output. Positional indices remain stable across subsequent
`PlanMerger.merge` calls (outputs are only ever appended), whereas retained
`Attribute` values can become stale when filter merging replaces expressions
with new aliases. This also simplifies the two call sites in `MergeSubplans`.

### Why are the changes needed?

Computing aggregates over the same table with different `WHERE` clauses is a
common analytical pattern (e.g. conditional sums or counts for different
predicates). Without this change each subquery forces a separate full scan;
merging them reduces scan count and overall query cost.

### Does this PR introduce _any_ user-facing change?

Yes. A new config `spark.sql.planMerge.symmetricFilterPropagation.enabled`
(default `true`) is added. The optimization is otherwise transparent: queries
produce the same results, and both configs can be set to `false` to restore
the previous behavior.

### How was this patch tested?

New unit tests in `MergeSubplansSuite` and new end-to-end tests in
`PlanMergeSuite` covering the basic two-subplan cases, three-subplan merging,
disabled configs, grouping aggregates (not merged), asymmetric filters, stacked
filters, and reversed filter ordering.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Sonnet 4.6 (Anthropic)
@peter-toth peter-toth force-pushed the SPARK-40193-merge-filters-2 branch from 66f7683 to 5f95716 Compare April 16, 2026 13:36
@peter-toth peter-toth changed the title [WIP][SPARK-40193][SQL] Merge subplans with different filter conditions [SPARK-40193][SQL] Merge subplans with different filter conditions Apr 16, 2026
@peter-toth peter-toth marked this pull request as ready for review April 16, 2026 13:37
@peter-toth
Copy link
Copy Markdown
Contributor Author

I measured the following improvements with the affected queries:

[info] TPCDS:                                    Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] q2 - filter propagation disabled                   3021           3215         275          3.6         278.9       1.0X
[info] q2 - filter propagation enabled                    2766           2810          61          3.9         255.4       1.1X

[info] TPCDS:                                    Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] q9 - filter propagation disabled                   5780           5914         190          0.0   148204287.4       1.0X
[info] q9 - filter propagation enabled                    1583           1586           4          0.0    40582467.9       3.7X

[info] TPCDS:                                    Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] q28 - filter propagation disabled                 11715          12050         474          1.2         851.6       1.0X
[info] q28 - filter propagation enabled                   3464           3568         148          4.0         251.8       3.4X

[info] TPCDS:                                    Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] q59 - filter propagation disabled                  2356           2469         160          5.9         170.4       1.0X
[info] q59 - filter propagation enabled                   2255           2283          39          6.1         163.0       1.0X

@peter-toth
Copy link
Copy Markdown
Contributor Author

@LuciferYang this is now ready for review.

@LuciferYang
Copy link
Copy Markdown
Contributor

Thanks for pinging me. I will provide feedback tomorrow.

@peter-toth
Copy link
Copy Markdown
Contributor Author

cc @cloud-fan, @dongjoon-hyun, @yaooqinn as well

Copy link
Copy Markdown
Contributor

@LuciferYang LuciferYang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall it looks good.

val newNPCondition = npFilter.fold(mappedNPCondition) {
case (f, _) => And(f, mappedNPCondition)
}
val childProject = mergedChild.asInstanceOf[Project]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: mergedChild.asInstanceOf[Project] will throw ClassCastException if a future change breaks the invariant that a MERGED_FILTER_TAG filter always has a Project child. Consider a pattern match with an explicit error so the failure mode is clear:

  val childProject = mergedChild match {
    case p: Project => p
    case other => throw SomeException 
   }

.createOptional

val PLAN_MERGE_FILTER_PROPAGATION_ENABLED =
buildConf("spark.sql.planMerge.filterPropagation.enabled")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spark.sql.planMerge.* namespace is new — no other configs use it. Should this live under spark.sql.optimizer.* for discoverability, alongside spark.sql.optimizer.runtime.bloomFilter.enabled and similar? E.g. spark.sql.optimizer.mergeSubplans.filterPropagation.enabled, which also matches the rule name MergeSubplans.

.version("4.2.0")
.withBindingPolicy(ConfigBindingPolicy.SESSION)
.booleanConf
.createWithDefault(true)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The symmetric case broadens the combined filter to OR(f1, f2), which can reduce IO pruning on partitioned or file-pruned tables (as the doc itself notes). For a brand-new optimization shipped enabled-by-default, this can cause silent regressions on upgrade. Should this default to false for the first release, with users opting in once the behavior is validated in production? The asymmetric case (always beneficial) can keep its default.

comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze)
}

test("SPARK-40193: Merge non-grouping subqueries with different filter conditions") {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add three more tests to lock down the contract on the following paths?

  1. Distinct aggregates: a test using countDistinct($"a") (or any isDistinct = true aggregate) on each side, asserting either correct merge with FILTER clauses or correct rejection. Without it, the supported/unsupported boundary in supportedAggregateMerge is implicit.

  2. If wrapping with computed expressions: a test where each side has a Project between Filter and Aggregate containing a non-attribute expression (e.g. udf($"a") AS d or ($"a" + 1) AS d). The PlanMerger scaladoc example (around line 100 in PlanMerger.scala) shows this scenario, but no test currently exercises the If(filterAttr, expr, null) wrapping branch in mergeNamedExpressions for non-attribute expressions.

  3. Pre-existing FILTER on aggregate expressions: a test where one side's aggregate already carries a FILTER clause (e.g. count($"a") FILTER (WHERE $"b" > 0)). applyFilterToAggregateExpressions combines via And(propagatedFilter, existingFilter) but no test covers this combination.

Comment on lines +332 to +333
if (cp.getTagValue(PlanMerger.MERGED_FILTER_TAG).isDefined) {
// cp Filter is already a merged filter from a previous round: its condition
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:Indentation

Some((newNPFilter, true)), None))
}
} else {
// First-time filter propagation: alias both sides' conditions as boolean
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Indentation

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants